Spark 3.2: Fix predicate pushdown in row-level operations#4023
Spark 3.2: Fix predicate pushdown in row-level operations#4023aokolnychyi merged 1 commit intoapache:masterfrom
Conversation
76317e5 to
383d2d6
Compare
|
|
||
| val (scan, output) = PushDownUtils.pruneColumns( | ||
| scanBuilder, relation, relation.output, Seq.empty) | ||
| val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, relation.output, Nil) |
There was a problem hiding this comment.
Nit, is it necessary change , Seq.empty => Nil?
There was a problem hiding this comment.
I changed it so that it can fit on one line just like the new filter pushdown logic.
There was a problem hiding this comment.
Hi @aokolnychyi @szehon-ho any reason for not passing pushedFilters here instead of Nil?
There was a problem hiding this comment.
actually nvm, this is only to prune columns.
singhpk234
left a comment
There was a problem hiding this comment.
Looks good to me. Thanks @aokolnychyi !!!
| tableAttrs: Seq[AttributeReference]): (Seq[Filter], Seq[Expression]) = { | ||
|
|
||
| val tableAttrSet = AttributeSet(tableAttrs) | ||
| val filters = splitConjunctivePredicates(cond).filter(_.references.subsetOf(tableAttrSet)) |
There was a problem hiding this comment.
Was this what was preventing pushdown before? We weren't filtering out expressions that referenced columns outside of the table?
There was a problem hiding this comment.
Yes, we did not split the condition before into parts and did not remove filters that referenced both tables.
There was a problem hiding this comment.
@szehon-ho ˆˆ
This comment provides a little bit more info to answer your question above.
We treated t.id = s.id and t.dep IN ('hr') as a single predicate that couldn't be converted as it referenced both tables. Instead, we now split it into parts and convert whatever we can (i.e. t.dep IN ('hr') in this case).
|
|
||
| Snapshot mergeSnapshot = table.currentSnapshot(); | ||
| String deletedDataFilesCount = mergeSnapshot.summary().get(SnapshotSummary.DELETED_FILES_PROP); | ||
| Assert.assertEquals("Must overwrite only 1 file", "1", deletedDataFilesCount); |
There was a problem hiding this comment.
Other tests use the listener to check the expressions that were pushed down directly. Should we do that in this test?
There was a problem hiding this comment.
I think I missed that. Could you point me to an example?
|
Thanks for reviewing, @singhpk234 @szehon-ho @rdblue! |
(cherry picked from commit 5d599e1)
This PR fixes predicate pushdown in row-level operations in Spark 3.2. Previously, we would not extract filters and MERGE conditions such as
t.id = s.id and t.dep IN ('hr')would not be pushed down.